[Hadoop] number of splits 划分的条件

Posted by 李玉坤 on 2017-08-30
split:split是逻辑切片,在mapreduce中的map task开始之前,将文件按照指定的大小切割成若干个部分,每一部分称为一个split,默认是split的大小与block的大小相等,均为128MB。

split大小由minSize、minSize、blocksize决定,以wordcount代码为例

找到 job.waitForCompletion( true);进入waitForCompletion

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class WordCount {

     public static void main(String[] args) throws Exception {
          Configuration conf = new Configuration();
          Job job = Job. getInstance(conf);
          
           //notice
           job.setJarByClass(WordCount. class);
          
           //set mapper `s property
           job.setMapperClass(WCMapper. class);
           job.setMapOutputKeyClass(Text. class);
           job.setMapOutputValueClass(LongWritable. class);
          FileInputFormat. setInputPaths(job, new Path(args[0]));
          
           //set reducer`s property
           job.setReducerClass(WCReducer. class);
           job.setOutputKeyClass(Text. class);
           job.setOutputValueClass(LongWritable. class);
          FileOutputFormat. setOutputPath(job, new Path(args[1]));
          
           //submit
          job.waitForCompletion( true);
     }

}

进入waitForCompletion( true); 找到 submit();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 /**
   * Submit the job to the cluster and wait for it to finish.
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws IOException thrown if the communication with the
   *         <code>JobTracker</code> is lost
   */
  public boolean waitForCompletionboolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState. DEFINE) {
      submit();
    }
    if (verbose) {
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis =
        Job. getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread. sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie ) {
        }
      }
    }
    return isSuccessful();
  }

进入submit()找到 return submitter .submitJobInternal(Job.this, cluster);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
   * Submit the job to the cluster and return immediately.
   * @throws IOException
   */
  public void submit()
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState. DEFINE);
    setUseNewAPI();
    connect();
    final JobSubmitter submitter =
        getJobSubmitter( cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException,
      ClassNotFoundException {
        return submitter .submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState. RUNNING;
    LOG.info( "The url to track the job: " + getTrackingURL());
   }

进入submitJobInternal()方法找到 int maps = writeSplits( job, submitJobDir);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
 /**
   * Internal method for submitting jobs to the system.
   *
   * <p>The job submission process involves:
   * <ol>
   *   <li>
   *   Checking the input and output specifications of the job.
   *   </li>
   *   <li>
   *   Computing the {@link InputSplit}s for the job.
   *   </li>
   *   <li>
   *   Setup the requisite accounting information for the
   *   {@link DistributedCache} of the job, if necessary.
   *   </li>
   *   <li>
   *   Copying the job's jar and configuration to the map-reduce system
   *   directory on the distributed file -system.
   *   </li>
   *   <li>
   *   Submitting the job to the <code>JobTracker</code> and optionally
   *   monitoring it's status.
   *   </li>
   * </ol></p>
   * @param job the configuration to submit
   * @param cluster the handle to the Cluster
   * @throws ClassNotFoundException
   * @throws InterruptedException
   * @throws IOException
   */
  JobStatus submitJobInternal (Job job , Cluster cluster)
  throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs
    checkSpecs(job);

    Configuration conf = job.getConfiguration();
    addMRFrameworkToDistributedCache(conf);

    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    JobID jobId = submitClient.getNewJobID();
    job.setJobID( jobId);
    Path submitJobDir = new Path(jobStagingArea , jobId.toString());
    JobStatus status = null;
    try {
      conf.set(MRJobConfig. USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers",
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer" );
      conf.set(MRJobConfig. MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug( "Configuring job " + jobId + " with " + submitJobDir
          + " as the submit dir");
      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes( job.getCredentials(),
          new Path[] { submitJobDir }, conf );
     
      populateTokenCache( conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
        
          int keyLen = CryptoUtils.isShuffleEncrypted(conf)
              ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
                  MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS )
              : SHUFFLE_KEY_LENGTH;
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init( keyLen);
        } catch (NoSuchAlgorithmException e ) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen .generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }

      copyAndConfigureFiles( job, submitJobDir);
     
     

     
      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
     
      // Create the splits for the job
      LOG.debug( "Creating splits at " + jtFs.makeQualified(submitJobDir ));
      int maps = writeSplits( job, submitJobDir);
      conf.setInt(MRJobConfig. NUM_MAPS, maps );
      LOG.info( "number of splits:" + maps );

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      String queue = conf.get(MRJobConfig. QUEUE_NAME,
          JobConf. DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue );
      conf.set(toFullPropertyName(queue ,
          QueueACL. ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(conf );

      if (conf.getBoolean(
          MRJobConfig. JOB_TOKEN_TRACKING_IDS_ENABLED ,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED )) {
        // Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t .decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig. RESERVATION_ID, reservationId.toString());
      }

      // Write job file to submit dir
      writeConf( conf, submitJobFile);
     
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens( jobId, job.getCredentials());
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status ;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete( submitJobDir, true );

      }
    }
  }

进入writeSplits方法找到 maps = writeNewSplits( job, jobSubmitDir);

1
2
3
4
5
6
7
8
9
10
11
12
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job ,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf) job.getConfiguration();
    int maps;
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits( job, jobSubmitDir);
    } else {
      maps = writeOldSplits( jConf, jobSubmitDir);
    }
    return maps;
  }

进入writeNewSplits找到 List splits = input.getSplits( job);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    List<InputSplit> splits = input.getSplits( job);
    T[] array = (T[]) splits.toArray( new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array , new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir , conf ,
        jobSubmitDir.getFileSystem(conf ), array );
    return array. length;
  }

进入getsplits进入找到如下三行就是关键点代码

long minSize = Math. max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);

long splitSize = computeSplitSize(blockSize, minSize, maxSize );

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
   * Generate the list of files and make them into FileSplits.
   * @param job the job context
   * @throws IOException
   */
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    Stopwatch sw = new Stopwatch().start();
    long minSize = Math. max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus( job);
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job .getConfiguration());
          blkLocations = fs .getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(job , path )) {
          long blockSize = file .getBlockSize();
          long splitSize = computeSplitSize(blockSize, minSize, maxSize );

          long bytesRemaining = length ;
          while (((double ) bytesRemaining )/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining );
            splits.add(makeSplit( path, length-bytesRemaining , splitSize ,
                        blkLocations[blkIndex ].getHosts(),
                        blkLocations[blkIndex ].getCachedHosts()));
            bytesRemaining -= splitSize ;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining );
            splits.add(makeSplit( path, length-bytesRemaining , bytesRemaining ,
                       blkLocations[blkIndex ].getHosts(),
                       blkLocations[blkIndex ].getCachedHosts()));
          }
        } else { // not splitable
          splits.add(makeSplit( path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else {
        //Create empty hosts array for zero length files
        splits.add(makeSplit( path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong( NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug( "Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw .elapsedMillis());
    }
    return splits;
  }

分析:
(1)getFormatMinSplitSize()方法返回的值为1,
(2)getMinSplitSize( job)

1
2
3
4
5
6
7
8
/**
   * Get the minimum split size
   * @param job the job
   * @return the minimum number of bytes that can be in a split
   */
  public static long getMinSplitSize(JobContext job) {
    return job.getConfiguration().getLong( SPLIT_MINSIZE, 1L);
  }

SPLIT_MINSIZE 看配置mapreduce.input.fileinputformat.split.minsize的值,默认在hadoop-mapreduce-client-core-2.6.0.jar配置文件mapred-default.xml中

1
2
3
4
5
6
7
<property>
  <name>mapreduce.input.fileinputformat.split.minsize</name>
  <value>0</value>
  <description>The minimum size chunk that map input should be split
  into.  Note that some file formats may have minimum split sizes that
  take priority over this setting.</description>
</property>

(3)getMaxSplitSize(job)方法
SPLIT_MAXSIZE看配置mapreduce.input.fileinputformat.split.maxsize的值,默认在hadoop-mapreduce-client-core-2.6.0.jar配置文件mapred-default.xml中没有进行配置,Long.MAX_VALUE= 2的63次方减1
所以最大值为

1
2
3
4
5
6
7
8
9
  /**
   * Get the maximum split size.
   * @param context the job to look at.
   * @return the maximum number of bytes a split can include
   */
  public static long getMaxSplitSize(JobContext context) {
    return context.getConfiguration().getLong(SPLIT_MAXSIZE,
                                              Long.MAX_VALUE );
  },

(4)long minSize = Math. max(getFormatMinSplitSize(), getMinSplitSize(job));
Math.max(1,0)=1
minSize=1

8.FileInputFormat.java类的computeSplitSize方法,435行

1
2
3
4
  protected long computeSplitSizelong blockSize , long minSize,
                                  long maxSize ) {
    return Math. max(minSize, Math.min(maxSize , blockSize));
  }

分析:
minSize = 1
maxSize = 2的63次方减1
blockSize=一个块大小,默认为128M
Math.min(maxSize , blockSize )=1281024
Math.max( 1,128\
1024)=128*1024
所以默认情况下一个块就是一个map,这样做的好处是在执行map的时候不需要讲数据拷贝到map端,因为有的数据可能没有在map端需要进行拷贝。

由上面三个参数就可以计算出分片大小了,也能得到如下结论:
Split与block的对应关系可能是多对一,默认是一对一

在mapreduce的FileInputFormat类中的getSplits() 方法对文件进行split,算法如下:
Math.max(minSize,Math.min(maxSize, blockSize)),其中maxSize是取得longValueMax的值
1.如果blockSize小于maxSize && blockSize 大于 minSize之间,那么split就是blockSize(一对一);
2.如果blockSize小于maxSize && blockSize 小于 minSize之间,那么split就是minSize;(一对多) ;
3.如果blockSize大于maxSize && maxSize 大于 minSize之间,那么split就是maxSize(多对一);
4.如果blockSize大于maxSize && maxSize 小于 minSize之间,那么split就是maxSize(不存在这种关系)。

在优化过程中,若想调整split大小控制map task的数量,原则如下:

文件大小不变,minsize大小默认,增加map task数量,减小maxSize,则split减小
文件大小不变,maxSize大小默认,减小map task数量,增大minSize,则split增大

注意:split大小如何调整,split只能是一个文件的分片,不能让多个小文件“划入”一个split中